@Async异步任务与线程池 您所在的位置:网站首页 class level 区别 @Async异步任务与线程池

@Async异步任务与线程池

2023-08-21 23:06| 来源: 网络整理| 查看: 265

写在前面:本篇文章是关于使用@Async进行异步任务,并且关于线程池做了一个初步的梳理和总结,包括遇到过的一些坑

在工作中用到的一些线程池

以下代码已做脱敏处理

1.newCachedThreadPool

private void startTask(List usersList){ ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(()->{ //do someting }); }

2.newScheduledThreadPool

@Configuration public class ScheduleConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { //当然了,这里设置的线程池是corePoolSize也是很关键了,自己根据业务需求设定 taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10)); } }

如果在idea中安装了阿里规范插件,就会发现上面两种创建线程池的方式都会报红。原因是:

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下:

FixedThreadPool和SingleThreadPool:

允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

CachedThreadPool:

允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

其实这里CachedThreadPool和newScheduledThreadPool是一样的,都是因为最大线程数被设置成了Integer.MAX_VALUE。

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

在源码中可以看的出newCachedThreadPool使用的是synchronousqueue队列,也可以看作是一个长度为1的BlockingQueue所以,再加上最大允许线程数为Integer.MAX_VALUE,就导致可能会创建大量线程导致OOM。

同理ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,初始化大小为16。当队列满后就会创建新线程,就导致可能会创建大量线程导致OOM。

我们不妨实际来测试一下,以newCachedThreadPool为例,jvm参数-Xms64m -Xmx192m -Xss1024K -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=128m。

@PostMapping("/newCachedThreadPoolExample") @ResponseBody public void newCachedThreadPoolExample(){ ExecutorService executorService = Executors.newCachedThreadPool(); while (true){ executorService.submit(()->{ log.info("submit:"+LocalDateTime.now()); try { Thread.sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } }); } }

刚启动时的情况:

x1.png

请求接口后就开始爆炸

x2.png 然后就开始卡着不动了

x3.png

比较尴尬的是一直没有出现报错OOM的情况,就直接卡死了。

x4.png 总结

以上的线程池虽然可以在外部限制的情况下避免OOM等情况,但是还是建议尽量根据自己的业务情况自定义线程池。

使用@Async快速创建一个异步任务

1. application.yml

这里是线程池相关配置,先不详细说,同理可以在代码里面配置config。

线程池缓冲队列的选择

以上发生的问题大多数都和线程池的缓冲队列有关,选择一个符合自己业务特点的缓冲队列也十分重要。

x5'.png

spring: task: execution: pool: # 最大线程数 max-size: 16 # 核心线程数 core-size: 16 # 存活时间 keep-alive: 10s # 队列大小 queue-capacity: 100 # 是否允许核心线程超时 allow-core-thread-timeout: true # 线程名称前缀 thread-name-prefix: async-task-

2.ThreadpoolApplication

这里需要在 Application上添加 @EnableAsync注解,开启异步任务。如果是选择在代码里面写config,则需要在config文件上添加@EnableAsync注解。

@EnableAsync @SpringBootApplication public class ThreadpoolApplication { public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); } }

3.AsyncTask

编写一个异步任务处理类,在需要开启异步的方法上面添加@Async

@Component @Slf4j public class AsyncTask { @Async public void asyncRun() throws InterruptedException { Thread.sleep(10); log.info(Thread.currentThread().getName()+":处理完成"); } }

4.AsyncService

编写一个调用异步方法的service

@Service @Slf4j public class AsyncService { @Autowired private AsyncTask asyncTask; public void asyncSimpleExample() { try { log.info("service start"); asyncTask.asyncRun(); log.info("service end"); }catch (InterruptedException e){ e.printStackTrace(); } } }

5.AsyncController

编写一个Controller去调用AsyncService

/** * @author kurtl */ @Controller @RequestMapping("/") public class AsyncController { @Autowired private AsyncService asyncService; @PostMapping("/asyncSimpleExample") @ResponseBody public void asyncSimpleExample(){ asyncService.asyncSimpleExample(); } }

最后请求这个接口

x6.png

可以看到,先输出了asyncSimpleExample里面打印的service start与service end,表示service方法先执行完毕了,而异步方法则在调用后进行了一个sleep,service没有同步等待sleep完成,而是直接返回,表示这个是异步任务。至此我们已经通过@Async成功创建的异步任务。

关于@Async和@EnableAsync的原理

个人觉得源码中很重要的一部分就是源码中的注释,阅读注释也可以帮你快速了解源码的作用等,所有我会把重要的注释稍微翻译一下

1.@Async源码

@Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Async { /** * A qualifier value for the specified asynchronous operation(s). *

May be used to determine the target executor to be used when executing * the asynchronous operation(s), matching the qualifier value (or the bean * name) of a specific {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. *

When specified on a class-level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method-level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */ /** * 在这些注释中有三个非常重要的部分 * 1.使用@Async的方法只能返回Void 或者 Future类型 * 2.表明了@Async是通过org.springframework.core.task.TaskExecutor * 或者java.util.concurrent.Executor来创建线程池 * 3.写了@Async的作用范围 在类上使用@Async会覆盖方法上的@Async */ String value() default ""; }

2.@EnableAsync源码

/** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code } XML namespace. * *

To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * * @Configuration * @EnableAsync * public class AppConfig { * * } * 这里表示需要联合@Configuration注解一起使用,所以@EnableAsync应该 * 添加在线程池Config或者SpringBootApplication 上 * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * * @Configuration * public class AnotherAppConfig { * * @Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * } * *

By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为 * taskExecutor的Executor类型的bean,都不存在使用 * SimpleAsyncTaskExecutor执行器但是这个SimpleAsyncTaskExecutor实际 * 上是有很大的坑的,建议是自定义一个线程池,这个后面会说 * will be used to process async method invocations. Besides, annotated methods having * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. *

By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. *

This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */ Class submit(Runnable task) { FutureTask future = new FutureTask(task, null); execute(future, TIMEOUT_INDEFINITE); return future; } @Override public Future submit(Callable task) { FutureTask future = new FutureTask(task); execute(future, TIMEOUT_INDEFINITE); return future; } @Override public ListenableFuture submitListenable(Runnable task) { ListenableFutureTask future = new ListenableFutureTask(task, null); execute(future, TIMEOUT_INDEFINITE); return future; } @Override public ListenableFuture submitListenable(Callable task) { ListenableFutureTask future = new ListenableFutureTask(task); execute(future, TIMEOUT_INDEFINITE); return future; } /** * Template method for the actual execution of a task. *

The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */ //判断是否有工厂,没有的话调用父类创建线程 protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); } /** * Subclass of the general ConcurrencyThrottleSupport class, * making {@code beforeAccess()} and {@code afterAccess()} * visible to the surrounding class. */ private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport { @Override protected void beforeAccess() { super.beforeAccess(); } @Override protected void afterAccess() { super.afterAccess(); } } /** * This Runnable calls {@code afterAccess()} after the * target Runnable has finished its execution. */ private class ConcurrencyThrottlingRunnable implements Runnable { private final Runnable target; public ConcurrencyThrottlingRunnable(Runnable target) { this.target = target; } @Override public void run() { try { this.target.run(); } finally { concurrencyThrottle.afterAccess(); } } } }

最主要的就是这段代码

/** * Template method for the actual execution of a task. *

The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */ //判断是否有工厂,没有的话调用父类创建线程 protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }

这里并不是用线程池,而是直接创建新的线程,所以会大量创建线程导致OOM。其实这个类是可以通过setConcurrencyLimit设置最大线程数,通过synchronized和wati and notify去进行限流,这里不展开讲。所以结论是在使用@Async一定要设置线程池。

@Async异步失效

以下代码已做脱敏处理

在看公司代码的时候,发现这样一段代码

public UserVO saveUser(HttpServletRequest request, String source) { String token = RequestUtils.getToken(request); String uid = checkUserLoginReturnUid(token); log.info("注册登录, token : {}, uid : {}", token, uid); //获取用户信息 User User = getLoginUser(uid); if(User == null){ User = new User(); //获取用户信息 Map userMap = redisTemplateMain.getUserMapByToken(token); //保存用户 saveUser(User, userMap, source); sendUserSystem(Integer.valueOf(userMap.get("id"))); } //用户信息放进缓存 setAuth2Redis(User); return setUser2Redis(User); } //通知用户系统,我们这边成功注册了一个用户 @Async public void sendUserSystem(Integer userId){ Map map = new HashMap(); map.put("mainUid", userId); map.put("source", ""); String json = HttpUtil.post(property.userRegisterSendSystem, map); log.info("sendUserSystem userId : {}, json : {}", userId, json); }

在之前我们看源码的时候已经知道了,由于@Async的AdviceMode默认为PROXY,所以当调用方和被调用方是在同一个类中,无法产生切面,@Async没有被Spring容器管理。 所以这个方法跑了这么久一直是同步。

我们可以写一个方法去测试一下。

public void asyncInvalid() { try { log.info("service start"); asyncInvalidExample(); log.info("service end"); }catch (InterruptedException e){ e.printStackTrace(); } } @Async public void asyncInvalidExample() throws InterruptedException{ Thread.sleep(10); log.info(Thread.currentThread().getName()+":处理完成"); }

调用结果很明显,没有进行异步操作,而是同步。

x7.1.png

线程池拒绝导致线程丢失

既然线程池都已一个缓冲队列来保存未被消费的任务,那么就一定存在队列被塞满,导致线程丢失的情况。我们写一段代码模拟一下。

配置文件

spring: task: execution: pool: # 最大线程数 max-size: 16 # 核心线程数 core-size: 16 # 存活时间 keep-alive: 10s # 队列大小 queue-capacity: 100 # 是否允许核心线程超时 allow-core-thread-timeout: true # 线程名称前缀 thread-name-prefix: async-task-

异步方法

@Async public void asyncRefuseRun() throws InterruptedException { Thread.sleep(5000000); }

调用方法

public void asyncRefuseRun() { for (int t = 0;t


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有